package com.jfire.socket.socketserver.handler;

import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.jfire.baseutil.order.AescComparator;
import com.jfire.baseutil.order.DescComparator;
import com.jfire.socket.socketserver.bus.Message;
import com.jfire.socket.socketserver.interceptor.MessageInterceptor;

/**
 * 消息动作类，完成对消息的进入拦截，请求处理，出口拦截，写出到通道。实现Callable接口，放入线程池进行调用。
 * 
 * @author linbin
 * 
 */
public final class MessageAction implements Callable<Void>
{
    private Queue<Message>       queue  = new ConcurrentLinkedQueue<>();
    private Logger               logger = LogManager.getLogger();
    private MessageInterceptor[] inInterceptors;
    private MessageInterceptor[] outInterceptors;
    private int                  interceptorNum;
    // 命令数组
    private byte[]               command;
    // 命令数组的长度
    private int                  limit;
    private MessageHandler[]     handlers;
    
    /**
     * 使用拦截器数组初始化对象，会自动将拦截器进行排序，按照入口顺序和出口顺序存放
     * 
     * @param interceptors
     */
    public MessageAction(MessageInterceptor[] interceptors, MessageHandler[] handlers)
    {
        this.inInterceptors = interceptors;
        Arrays.sort(this.inInterceptors, new AescComparator());
        interceptorNum = inInterceptors.length;
        outInterceptors = Arrays.copyOf(inInterceptors, interceptorNum);
        Arrays.sort(outInterceptors, new DescComparator());
        this.handlers = handlers;
        limit = handlers.length;
        command = new byte[limit];
        for (int i = 0; i < limit; i++)
        {
            command[i] = handlers[i].interestedDataPacketType();
        }
    }
    
    @Override
    public Void call() throws Exception
    {
        Message msg = queue.poll();
        if (msg.getChannelInfo().isOpen() == false)
        {
            logger.debug("通道{}已经关闭，不处理消息，直接退出", msg.getChannelInfo().getAddress());
            return null;
        }
        try
        {
            byte msgCommand = msg.getCommand();
            int handlerIndex = 0;
            for (; handlerIndex < limit; handlerIndex++)
            {
                if (command[handlerIndex] == msgCommand)
                {
                    break;
                }
            }
            if (handlerIndex == limit)
            {
                //消息类型不识别，直接关闭
                msg.getChannelInfo().close();
                return null;
            }
            int interceptorIndex = 0;
            int result = -1;
            for (; interceptorIndex < interceptorNum;)
            {
                result = inInterceptors[interceptorIndex].inInterceptor(msg);
                interceptorIndex++;
                if (result > 1)
                {
                    break;
                }
            }
            if (result == MessageInterceptor.closed)
            {
                msg.getChannelInfo().close();
                return null;
            }
            if (result != MessageInterceptor.interceptored)
            {
                handlers[handlerIndex].handler(msg);
            }
            for (int i = interceptorNum - interceptorIndex; i < interceptorNum; i++)
            {
                result = outInterceptors[i].outInterceptor(msg);
                if (result > 1)
                {
                    break;
                }
            }
            if (result == MessageInterceptor.closed)
            {
                msg.getChannelInfo().close();
                return null;
            }
            msg.flowFinish();
            if (msg.getChannelInfo().isOpen())
            {
                msg.tryWrite();
            }
            else
            {
                logger.debug("通道{}已经关闭，不发送消息", msg.getChannelInfo().getAddress());
            }
        }
        catch (Exception e)
        {
            logger.error("消息处理过程发生异常，关闭通道，当前buffer状态{}", msg.getWriteBuffer(), e);
            msg.getChannelInfo().close();
        }
        return null;
    }
    
    public void addMessage(Message message)
    {
        queue.add(message);
    }
    
}
